package com.wugui.datax.executor.service.jobhandler;

import cn.hutool.core.io.FileUtil;
import cn.hutool.core.util.IdUtil;
import com.wugui.datatx.core.biz.model.HandleProcessCallbackParam;
import com.wugui.datatx.core.biz.model.ReturnT;
import com.wugui.datatx.core.biz.model.TriggerParam;
import com.wugui.datatx.core.handler.IJobHandler;
import com.wugui.datatx.core.handler.annotation.JobHandler;
import com.wugui.datatx.core.log.JobLogger;
import com.wugui.datatx.core.thread.ProcessCallbackThread;
import com.wugui.datatx.core.util.ProcessUtil;
import com.wugui.datax.executor.service.logparse.LogStatistics;
import com.wugui.datax.executor.util.SystemUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.io.*;
import java.util.concurrent.FutureTask;

import static com.wugui.datax.executor.service.command.BuildCommand.buildDataXExecutorCmd;
import static com.wugui.datax.executor.service.jobhandler.DataXConstant.DEFAULT_JSON;
import static com.wugui.datax.executor.service.logparse.AnalysisStatistics.analysisStatisticsLog;

/**
 * DataX任务运行
 *
 * @author jingwk 2019-11-16
 */
@JobHandler(value = "executorJobHandler")
@Component
public class ExecutorJobHandler extends IJobHandler {

  @Value("${datax.executor.jsonpath}")
  private String jsonPath;

  @Value("${datax.pypath}")
  private String dataXPyPath;

  @Override
  public ReturnT<String> execute(TriggerParam trigger) {

    int exitValue = -1;
    Thread errThread = null;
    String tmpFilePath;
    LogStatistics logStatistics = null;
    // Generate JSON temporary file
    tmpFilePath = generateTemJsonFile(trigger.getJobJson());

    try {
      String[] cmdarrayFinal = buildDataXExecutorCmd(trigger, tmpFilePath, dataXPyPath);
      final Process process = Runtime.getRuntime().exec(cmdarrayFinal);
      String prcsId = ProcessUtil.getProcessId(process);
      JobLogger.log("------------------DataX process id: " + prcsId);
      jobTmpFiles.put(prcsId, tmpFilePath);
      // update datax process id
      HandleProcessCallbackParam prcs =
          new HandleProcessCallbackParam(trigger.getLogId(), trigger.getLogDateTime(), prcsId);
      ProcessCallbackThread.pushCallBack(prcs);
      // log-thread
      Thread futureThread = null;
      FutureTask<LogStatistics> futureTask =
          new FutureTask<>(
              () -> analysisStatisticsLog(new BufferedInputStream(process.getInputStream())));
      futureThread = new Thread(futureTask);
      futureThread.start();

      errThread =
          new Thread(
              () -> {
                try {
                  analysisStatisticsLog(new BufferedInputStream(process.getErrorStream()));
                } catch (IOException e) {
                  JobLogger.log(e);
                }
              });

      logStatistics = futureTask.get();
      errThread.start();
      // process-wait
      exitValue = process.waitFor(); // exit code: 0=success, 1=error
      // log-thread join
      errThread.join();
    } catch (Exception e) {
      JobLogger.log(e);
    } finally {
      if (errThread != null && errThread.isAlive()) {
        errThread.interrupt();
      }
      //  删除临时文件
      if (FileUtil.exist(tmpFilePath)) {
        FileUtil.del(new File(tmpFilePath));
      }
    }
    if (exitValue == 0) {
      return new ReturnT<>(200, logStatistics.toString());
    } else {
      return new ReturnT<>(
          IJobHandler.FAIL.getCode(), "command exit value(" + exitValue + ") is failed");
    }
  }

  private String generateTemJsonFile(String jobJson) {
    String tmpFilePath;
    String dataXHomePath = SystemUtils.getDataXHomePath();
    if (StringUtils.isNotEmpty(dataXHomePath)) {
      jsonPath = dataXHomePath + DEFAULT_JSON;
    }
    if (!FileUtil.exist(jsonPath)) {
      FileUtil.mkdir(jsonPath);
    }
    tmpFilePath = jsonPath + "jobTmp-" + IdUtil.simpleUUID() + ".conf";
    // 根据json写入到临时本地文件
    try (PrintWriter writer = new PrintWriter(tmpFilePath, "UTF-8")) {
      writer.println(jobJson);
    } catch (FileNotFoundException | UnsupportedEncodingException e) {
      JobLogger.log("JSON 临时文件写入异常：" + e.getMessage());
    }
    return tmpFilePath;
  }
}
